iT邦幫忙

2023 iThome 鐵人賽

DAY 24
0

終於要開始遊戲了,在此之前我們還需要將連線和房間、玩家資料做綁定,這樣遊戲伺服器不會搞錯資料發送的對象。前面會講解概念的部分,也會附上目前實作,從開房間到開始遊戲這部分是可以運作的,我們之後會再加上遊戲運作的部分。

概念

在 python websockets 套件中並不像 socketio 那樣有內建房間等概念,所以我們需要自己維護房間的資訊。

在遊戲開始之後,所有遊戲相關的操作都還是透過 websocket 的連線物件 (後面都以 conn 代稱) 讀取,但遊戲伺服器其實不知道這個連線對應了哪個房間?哪位玩家?所以在遊戲之前我們需要額外建立一個「連線對應(房間&玩家)」的對照表,這樣子遊戲伺服器收到封包資料的時候,它就可以查詢這個連線對應的房號和玩家編號。

那我們要用什麼來識別連線物件呢?還好 websockets 套件的連線物件自己本身都自帶一個 id (uuid v4) 這樣子我們就可以安心地拿來使用了

房間狀態

room_conns = {} # 定義對照表
room_coons[room_id] = [conn1, conn2] # 以 room_id 作為 key, 對應到房間內的所有 conn

連線綁定

player_conns = {} # 定義對照表
conns = [room_id] # 取得房間內所有的 conn
player_conns[conns[0].id] = (room_id, 1) # 以 conn_id 作為 key,對應 (room_id, player_id)
player_conns[conns[1].id] = (room_id, 2) # 第二位玩家的對照,同上

可能的應用場景

  1. 收到遊戲操作指令,判斷該連線的玩家是否可以行動?如果玩家可以行動,則執行操作指令
  2. 遊戲伺服器發送遊戲開始通知 & 遊戲初始狀態給雙方玩家
  3. 遊戲伺服器發送遊戲回合開始通知給特定玩家

實作

game.py

import random

def init_game():
  # 初始玩家生命值
  initial_health = 15

  # 初始卡牌堆
  player1_deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  player2_deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  player1_hand = [player1_deck.pop(random.randint(0, len(player1_deck) - 1))]
  player2_hand = [player2_deck.pop(random.randint(0, len(player2_deck) - 1))]

  # 遊戲狀態字典
  game_state = {
    "player1_health": initial_health,
    "player2_health": initial_health,
    "player1_deck": player1_deck,
    "player2_deck": player2_deck,
    "player1_hand": player1_hand,
    "player2_hand": player2_hand,
    "current_player": 1,  # 1代表玩家1,2代表玩家2
    "winner": None  # 初始沒有獲勝者
  }
  return game_state

def draw_card(game_state):
  current_player = game_state["current_player"]
  deck = game_state[f"player{current_player}_deck"]
  hand = game_state[f"player{current_player}_hand"]

  if len(deck) > 0:
    # 從卡牌堆中隨機抽一張卡牌
    card_index = random.randint(0, len(deck) - 1)
    card_id = deck.pop(card_index)
    # 把卡牌加入手牌
    hand.append(card_id)
    print(f'玩家{current_player} 抽出 {card_id} 並加入手牌')

  # 更新遊戲狀態
  updated_state = {
    f"player{current_player}_deck": deck,
    f"player{current_player}_hand": hand,
  }
  return updated_state

# TODO 事前排除 card_id 不在玩家手牌中的案例
def play_card(game_state, card_id):
  current_player = game_state["current_player"]
  other_player = 2 if current_player == 1 else 1  # 對手
  hand = game_state[f"player{current_player}_hand"]
  other_player_health = game_state[f"player{other_player}_health"]

  if card_id in hand:
    # 找到卡牌並計算傷害值(這裡可以根據卡牌的屬性進行計算)
    damage = card_id
    # 更新對手的生命值
    other_player_health -= damage
    print(f'玩家{current_player}使用{card_id}造成傷害,玩家{other_player}生命剩下{other_player_health}點')

    if other_player_health <= 0:
      # 如果對手生命值小於等於0,遊戲結束,當前玩家獲勝
      game_state["winner"] = current_player
      print(f'遊戲結束,贏家為{current_player}')

    # 移除已經使用的卡牌
    hand.remove(card_id)
  else:
    print(f'WARN: {card_id} 不在玩家手牌中')

  # 更新遊戲狀態
  updated_state = {
    f"player{other_player}_health": other_player_health,
    f"player{current_player}_hand": hand,
    "current_player": other_player,  # 切換回合
  }
  return updated_state

# NOTE: 自動遊玩,測試用
def autoplay_card(game_state):
  current_player = game_state["current_player"]
  other_player = 2 if current_player == 1 else 1  # 對手
  hand = game_state[f"player{current_player}_hand"]
  other_player_health = game_state[f"player{other_player}_health"]

  if len(hand) == 0:
    print(f'ERRO: 玩家手牌為空')
    return None

  card_id = hand.pop() # 取出手牌最後一張
  # 找到卡牌並計算傷害值(這裡可以根據卡牌的屬性進行計算)
  damage = card_id
  # 更新對手的生命值
  other_player_health -= damage
  print(f'玩家{current_player}使用{card_id}造成傷害,玩家{other_player}生命剩下{other_player_health}點')

  if other_player_health <= 0:
    # 如果對手生命值小於等於0,遊戲結束,當前玩家獲勝
    game_state["winner"] = current_player
    print(f'遊戲結束,贏家為{current_player}')

  # 更新遊戲狀態
  updated_state = {
    f"player{other_player}_health": other_player_health,
    f"player{current_player}_hand": hand,
    "current_player": other_player,  # 切換回合
  }
  return updated_state

def is_game_over(game_state):
  return game_state["winner"] != None

# 將遊戲狀態以玩家視角輸出,過濾掉非公開資訊
def game_state_view_as(game_state, player_id):
  other_player = 2 if player_id == 1 else 1  # 對手
  game_data = {
    "playerHealth": game_state[f"player{player_id}_health"],
    "playerHand": game_state[f"player{player_id}_hand"],
    "opponentHealth": game_state[f"player{other_player}_health"],
    "winner": game_state["winner"]
  }
  return game_data

if __name__ == "__main__":
  game_state = init_game() # 初始化遊戲
  turn = 1
  print(game_state)

  while not is_game_over(game_state):
    current_player = game_state['current_player']
    print(f'第{turn}回合,輪到玩家{current_player},請按下 enter 表示玩家抽牌', end='')
    _ = input() # 模擬玩家行動

    updated_state = draw_card(game_state)
    game_state.update(updated_state)
    print(f"玩家{current_player}的手牌:", game_state[f"player{current_player}_hand"])

    # updated_state = play_card(game_state, game_state[f"player{current_player}_hand"][0])
    updated_state = autoplay_card(game_state) # 純自動模式
    
    game_state.update(updated_state)

    print("玩家1的生命值:", game_state["player1_health"])
    print("玩家2的生命值:", game_state["player2_health"])
    turn += 1

    # print(game_state)

room.py

import asyncio
import websockets
import json

from game import init_game, draw_card, autoplay_card, game_state_view_as, is_game_over

server_host, server_port = 'localhost', 8765

# 存儲房間和玩家的資訊
player_conns = {} # conn_id => (room_id, player_id)
room_conns = {} # room_id => [conn...]
# NOTE: if conn_id in player_conns, then conn is gamming
games = {} # room_id => game

# 發送訊息給指定對象
async def notify(conn, payload):
  print(f'DEBUG: notify conn#{conn.id}', payload)
  await conn.send(payload)
  # print(f'DEBUG: _notify to conn#{conn.id} done')

# 發送訊息給群體
async def notify_all(conns, payload):
  for conn in conns:
    await notify(conn, payload)

# 通知玩家行動/等待
async def send_turn_notify(room_id):
  print(f'DEBUG: send_turn_notify room#{room_id}')
  game_state = games[room_id]
  conns = room_conns[room_id]
  current_player_id = game_state["current_player"]
  other_player_id = 2 if current_player_id == 1 else 1  # 對手
  current_player_conn = conns[current_player_id]
  other_player_conn = conns[other_player_id]

  payload1 = json.dumps({"message": "turn start"})
  await notify(current_player_conn, payload1)

  payload2 = json.dumps({"message": "wait"})
  await notify(other_player_conn, payload2)

# 遊戲開始
async def game_start(room_id):
  print(f'DEBUG: game_start room#{room_id}')

  conns = room_conns[room_id]
  player_conns[conns[0].id] = (room_id, 1)
  player_conns[conns[1].id] = (room_id, 2)
  games[room_id] = init_game()

  payload = json.dumps({"message": "game start"})
  await notify_all(conns, payload)
  room_id
  game_state_view_as1 = game_state_view_as(games[room_id], 1)
  state1 = json.dumps({"gameEvent": "setState", "gameData": game_state_view_as1})
  await notify(conns[0], state1)

  game_state_view_as2 = game_state_view_as(games[room_id], 2)
  state2 = json.dumps({"gameEvent": "setState", "gameData": game_state_view_as2})
  await notify(conns[1], state2)

  # await send_turn_notify(room_id)


# 定義 WebSocket 伺服器的處理邏輯
async def game_server(conn, path):
  print(f'DEBUG: create conn#{conn.id}')
  async for message in conn:
    data = json.loads(message)
    action = data.get("action")
    payload = data.get("payload")
    print('message', conn.id, action, payload)
    if action:
      if action == "create_room":
        print('DEBUG: room_conns', room_conns)
        # 創建房間
        room_id = len(room_conns) + 1
        room_conns[room_id] = [conn]
        print('create room', room_conns[room_id])

        response = {"message": f"Room created with ID {room_id}"}
        await conn.send(json.dumps(response))
      elif action == "join_room":
        print('DEBUG: room_conns', room_conns)
        # 加入房間
        room_id = payload.get("room_id")
        if room_id in room_conns and len(room_conns[room_id]) < 2:
          room_conns[room_id].append(conn)
          print('join room', room_conns[room_id])

          response = {"message": f"Joined room {room_id}"}
          await conn.send(json.dumps(response))
        else:
          response = {"error": "Room is full or does not exist"}
          await conn.send(json.dumps(response))
      elif action == "start_game":
        # 開始遊戲
        room_id = payload.get("room_id")
        if room_id in room_conns and len(room_conns[room_id]) == 2:
          response = {"message": "Starting the game..."}
          await conn.send(json.dumps(response))
          # 在這裡可以加入遊戲的邏輯
          await game_start(room_id)

        else:
          response = {"error": "Invalid room or not enough players"}
          await conn.send(json.dumps(response))
  print(f'DEBUG: conn#{conn.id} is disconnected')

# 啟動 WebSocket 伺服器
async def main():
    start_server = await websockets.serve(game_server, server_host, server_port)
    print(f"WebSocket server started on ws://{server_host}:{server_port}")

    await start_server.wait_closed()

if __name__ == "__main__":
    asyncio.run(main())

room.html

<!DOCTYPE html>
<html>
<head>
  <title>WebSocket Game Client</title>
</head>
<body>
  <h1>WebSocket Game</h1>
  <div id="messages"></div>
  <button onclick="sendMessage('create_room')">Create Room</button>
  <button onclick="joinRoom()">Join Room</button>
  <button onclick="startGame()">Start Game</button>
  
  <script>
    var ws;
    var messages = document.getElementById("messages");

    function showMessage(message) {
      var p = document.createElement("p");
      p.textContent = message;
      messages.appendChild(p);
    }

    function sendMessage(action, payload = {}) {
      if (ws && ws.readyState === WebSocket.OPEN) {
        var message = JSON.stringify({ action: action, payload: payload });
        ws.send(message);
      } else {
        showMessage("WebSocket not connected.");
      }
    }

    function joinRoom() {
      var room_id = parseInt(prompt("Enter room ID:"));
      if (isNaN(room_id)) {
        alert('room_id is not number')
        return
      }
      sendMessage('join_room', { room_id: room_id });
    }

    function startGame() {
      var room_id = parseInt(prompt("Enter room ID to start the game:"));
      if (isNaN(room_id)) {
        alert('room_id is not number')
        return
      }
      sendMessage('start_game', { room_id: room_id });
    }

    ws = new WebSocket("ws://localhost:8765");

    ws.onmessage = function(event) {
      var data = JSON.parse(event.data);
      if (data.message || data.error) {
        showMessage(data.message || data.error);
      }
      if (data.gameEvent) {
        console.log('gameEvent', data)
      }
    };
  </script>
</body>
</html>

說明

遊戲部分已經在 D23 的時候封裝成 module 了,但後來還想更進一步簡化操作的部分,所以多做了一個 autoplay_card() 這樣子玩家在呼叫這個 API 的時候就不用帶 card_id 參數,測試的時候會更簡單

感想

連線綁定的部分花了蠻多時間思考的,其實最大問題就是到底核心是「websocket」還是「game」的議題。後來其實沒有什麼結論,websockets 套件底下是以「連線」為主體去思考問題,如果要處理跟遊戲有關的功能就會需要相關的對照表。而遊戲伺服器那邊需要發送更新通知,它會需要對應的「連線」才能把資料送給對的玩家,所以兩邊都是互相依賴的狀況。目前是以「能動就好」的方式下去開發,各位看官可以根據自己的需求再微調程式架構

TODO

上述範例程式碼安置到 github 或其他環境


上一篇
D23 打磨遊戲程式碼
下一篇
D25 遊戲開始後的 client 端頁面 (未串接連線部分)
系列文
chatGPT 帶你從零開始寫 websocket 連線遊戲31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
sixwings
iT邦研究生 4 級 ‧ 2023-09-26 20:00:19

已補上完整遊戲實作範例

我要留言

立即登入留言